Java 并发基础(四)

CopyOnWriteArrayList 简介

并发包中的并发 List 只有 CopyOnWriteArrayList。CopyOnWriteArrayList 是一个线程 安全的 ArrayList,对其进行的修改操作都是在底层的一个复制的数组(快照)上进行的, 也就是使用了写时复制策略。

每个 CopyOnWriteArrayList 对象里面有一个 array 数组对象用来存放具体元素,ReentrantLock 独占锁对象用来保证同时只有一个线程 对 array 进行修改(Java 8 的实现方式 ,Java 17 已经改为了 synchronized)。

1
2
3
4
5
6
7
8
9
10
11
12
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

private transient volatile Object[] array;
final Object[] getArray() {
return array;
}
final void setArray(Object[] a) {
array = a;
}
// ...
}

如果让我们自己做一个写时复制的线程安全的 list 我们会怎么做,有哪些点需要考虑。

  • 何时初始化 list,初始化的 list 元素个数为多少,list 是有限大小吗?
  • 如何保证线程安全,比如多个线程进行读写时如何保证是线程安全的?
  • 如何保证使用迭代器遍历 list 时的数据一致性?


主要方法源码解析

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 创建一个大小为 0 的 Object 数组作为 array 的初始值
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}

// 创建一个list,其内部元素是入参toCopyIn的副本
public CopyOnWriteArrayList(E[] toCopyIn) {
setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}

// 入参为集合,将集合里面的元素复制到本list
public CopyOnWriteArrayList(Collection<? extends E> c) {
Object[] es;
if (c.getClass() == CopyOnWriteArrayList.class)
es = ((CopyOnWriteArrayList<?>)c).getArray();
else {
es = c.toArray();
if (c.getClass() != java.util.ArrayList.class)
es = Arrays.copyOf(es, es.length, Object[].class);
}
setArray(es);
}


添加元素

CopyOnWriteArrayList 中用来添加元素的函数有 add(E e)、add(int index, E element)、addIfAbsent(E e) 和 addAllAbsent(Collection<? extends E> c) 等,它们的原理类似,我们以 add(E e) 为例来说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean add(E e) {
// 获取独占锁 (1)
final ReentrantLock lock = this.lock;
lock.lock();
try {
// (2) 获取array
Object[] elements = getArray();
// (3) 复制array到新数组,添加元素到新数组
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
// (4) 使用新数组替换添加前的数组
setArray(newElements);
return true;
} finally {
// (5) 释放独占锁
lock.unlock();
}
}


获取指定位置元素

使用 E get(int index) 获取下标为 index 的元素,如果元素不存在则抛出 IndexOutOfBoundsException 异常。

1
2
3
4
5
6
7
8
9
10
11
public E get(int index) {
return get(getArray(), index);
}

final Object[] getArray() {
return array;
}

private E get(Object[] a, int index) {
return (E) a[index];
}

在如上代码中,当线程 x 调用 get 方法获取指定位置的元素时,分两步走,首先获取 array 数组(这里命名为步骤 A),然后通过下标访问指定位置的元素(这里命名为步骤 B), 这是两步操作,但是在整个过程中并没有进行加锁同步。

1
2
线程x ---- CopyOnWriteArrayList.array(快照1) ---- 堆内存[1,2,3]
线程y ---- CopyOnWriteArrayList.array(快照2) ---- 堆内存[2,3]

由于执行步骤 A 和步骤 B 没有加锁,这就可能导致在线程 x 执行完步骤 A 后执行步 骤 B 前,另外一个线程 y 进行了 remove 操作,假设要删除元素 1。remove 操作首先会获 取独占锁,然后进行写时复制操作,也就是复制一份当前 array 数组,然后在复制的数组里面删除线程 x 通过 get 方法要访问的元素 1,之后让 array 指向复制的数组。而这时候 array 之前指向的数组的引用计数为 1 而不是 0,因为线程 x 还在使用它,这时线程 x 开始 执行步骤 B,步骤 B 操作的数组是线程 y 删除元素之前的数组,所以,虽然线程 y 已经删除了 index 处的元素,但是线程 x 的步骤 B 还是会返回 index 处的元素,这其实就是写时复制策略产生的弱一致性问题。


修改指定元素

使用 E set(int index,E element)修改 list 中指定元素的值,如果指定位置的元素不存在则抛出 IndexOutOfBoundsException 异常,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public E set(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index);
if (oldValue != element) {
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
} else {
// 为了保证 volatile 语义,还是需要重新设置 array,虽 然 array 的内容并没有改变。
setArray(elements);
}
return oldValue;
} finally {
lock.unlock();
}
}


删除元素

删除 list 里面指定的元素 , 可以使用 E remove(int index) 、boolean remove(Object o) 和 boolean remove(Object o, Object[] snapshot, int index) 等方法,它们的原理一样。以 remove(int index) 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E remove(int index) {
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取数组
Object[] elements = getArray();
int len = elements.length;
// 获取指定元素
E oldValue = get(elements, index);
int numMoved = len - index - 1;
// 如果要删除的是最后一个元素
if (numMoved == 0) {
setArray(Arrays.copyOf(elements, len - 1));
} else {
// 分两次复制删除后剩余的元素到新数组
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index, numMoved);
// 使用新数组代替老数组
setArray(newElements);
}
return oldValue;
} finally {
// 释放锁
lock.unlock();
}
}

如上代码其实和新增元素的代码类似,首先获取独占锁以保证删除数据期间其他线程 不能对 array 进行修改,然后获取数组中要被删除的元素,并把剩余的元素复制到新数组, 之后使用新数组替换原来的数组,最后在返回前释放锁。


弱一致性的迭代器

遍历列表元素可以使用迭代器。在讲解什么是迭代器的弱一致性前,先举一个例子来说明如何使用迭代器。

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
CopyOnWriteArrayList<String> arrayList = new CopyOnWriteArrayList<>();
arrayList.add("aaa");
arrayList.add("bbb");
Iterator<String> itr = arrayList.iterator();
while(itr.hasNext()){
System.out.println(itr.next());
}
}

所谓弱一致性是指返回迭代器后,其他线程对 list 的增删改对迭代器是不可见的,下面看看这是如何做到的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}

static final class COWIterator<E> implements ListIterator<E> {
// array的快照版本
private final Object[] snapshot;
// 数组下标
private int cursor;
// 构造函数
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
// 是否遍历结束
public boolean hasNext() {
return cursor < snapshot.length;
}
// 获取元素
public E next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return (E) snapshot[cursor++];
}
}

在如上代码中,当调用 iterator() 方法获取迭代器时实际上会返回一个 COWIterator 对 象,COWIterator 对象的 snapshot 变量保存了当前 list 的内容,cursor 是遍历 list 时数据的下标。

为什么说 snapshot 是 list 的快照呢?明明是指针传递的引用啊,而不是副本。如果 在该线程使用返回的迭代器遍历元素的过程中,其他线程没有对 list 进行增删改,那么 snapshot 本身就是 list 的 array,因为它们是引用关系。但是如果在遍历期间其他线程对该 list 进行了增删改,那么 snapshot 就是快照了,因为增删改后 list 里面的数组被新数组替换了, 这时候老数组被 snapshot 引用。这也说明获取迭代器后,使用该迭代器元素时,其他线程对该 list 进行的增删改不可见,因为它们操作的是两个不同的数组,这就是弱一致性。下面通过一个例子来演示多线程下迭代器的弱一致性的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Demo {
private static volatile CopyOnWriteArrayList<String> arrayList = new CopyOnWriteArrayList<>();

public static void main(String[] args) throws InterruptedException {
arrayList.add("aaa");
arrayList.add("bbb");
arrayList.add("ccc");
arrayList.add("ddd");
arrayList.add("eee");

Thread t1 = new Thread(() -> {
arrayList.set(1, "BBB");
arrayList.remove(2);
arrayList.remove(3);
}, "t1");

// 保证在修改线程启动前获取迭代器
Iterator<String> itr = arrayList.iterator();
// 启动子线程,并等待子线程执行完毕
t1.start();
t1.join();

// 迭代元素
while(itr.hasNext()){
System.out.println(itr.next());
}
}
}
1
2
3
4
5
aaa
bbb
ccc
ddd
eee

主线程在子线程执行完毕后使用获取的迭代器遍历数组元素,从输出结果我们知道, 在子线程里面进行的操作一个都没有生效,这就是迭代器弱一致性的体现。


Java17 的原子性实现

通过上面的讨论我们知道,Java8 采用 ReentrantLock 实现操作的原子性。为什么 Java17 改成了 synchronized 呢?只是因为:

  • JVM 对 synchronized 的持续优化:
    • 偏向锁优化(在 Java 15 部分调整)
    • 锁消除、锁粗化技术的成熟
    • 自旋锁优化,减少了线程的挂起开销
    • 在低竞争的场景下,synchronized 性能已经接近甚至超过了 ReentrantLock
    • 这个具体情境下,最主要的开销在数组的复制,锁的选择对整体性能 影响非常有限。
  • CopyOnWriteArrayList 的典型使用场景是 读多写少,在这种场景下完全无锁性能非常高,因为写操作不频繁,所以锁竞争通常不激烈。
  • CopyOnWriteArrayList 的实现只需要互斥访问,不需要 ReentrantLock 的高级特性
    • 不需要公平性锁
    • 没有尝试锁的需求,写入必须等待,不需要 tryLock
    • 无条件变量,不需要 Condition
    • 锁持有时间短,只是复制数组,很快就会释放
  • Java 并发库的演进方向:
    • 优先使用内置锁:除非有明确需求,否则用 synchronized
    • 简化并发代码:减少开发者出错的可能
    • 依赖 JVM 优化:让运行时做更多的优化
    • 保持 API 的稳定:对使用者完全透明

具体的改动细节,Java 8 (使用 ReentrantLock):

1
2
3
4
5
6
7
8
9
10
// 每一个实例都持有一个锁对象
final transient ReentrantLock lock = new ReentrantLock();

// 加锁需要显示调用
lock.lock();
try {
// 操作
} finally {
lock.unlock();
}

Java 17 (使用 synchronized):

1
2
3
4
5
6
7
// 每一个实例都持有一个锁对象
final transient Object lock = new Object();

// 使用 synchronized 块
synchronized {
// 操作
}